/*package com.easy.mq.listener.adapter;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.easy.mq.config.consumer.ManagerConsumerConfig;
import com.easy.mq.config.consumer.RabbitConsumerConfig;
import com.easy.mq.enums.ConsumeResultStatus;
import com.easy.mq.enums.XT;
import com.easy.mq.event.MQEvent;
import com.easy.mq.listener.AbstractMessageListenerContainer;
import com.easy.mq.listener.ManagerListener;
import com.easy.mq.serialization.SerializationFactory;
import com.easy.mq.utils.StringUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitNotifyListenerAdapter<T> extends AbstractMessageListenerContainer {

	private static final Logger logger = LoggerFactory.getLogger(RabbitNotifyListenerAdapter.class);

	private ManagerListener<T> managerListener;

	private ManagerConsumerConfig<T> managerConsumerConfig;

	public RabbitNotifyListenerAdapter(ManagerListener<T> managerConfig, ManagerConsumerConfig<T> managerConsumerConfig) {
		this.managerListener = managerConfig;
		this.managerConsumerConfig = managerConsumerConfig;
	}

	@Override
	public void registerMessageListener() {
		try {
			RabbitConsumerConfig rabbitConsumerConfig = ((RabbitConsumerConfig) managerConsumerConfig
					.getConsumer());
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(getHost(rabbitConsumerConfig));
			if (rabbitConsumerConfig.getPort() != null && rabbitConsumerConfig.getPort() > 0) {
				factory.setPort(rabbitConsumerConfig.getPort());
			}
			if (StringUtil.isNotEmpty(rabbitConsumerConfig.getUsername())) {
				factory.setUsername(rabbitConsumerConfig.getUsername());
			}

			if (StringUtil.isNotEmpty(rabbitConsumerConfig.getPassword())) {
				factory.setPassword(rabbitConsumerConfig.getPassword());
			}

			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();

			String queueName = rabbitConsumerConfig.getQueue();

			switch (XT.valueOf(rabbitConsumerConfig.getType())) {
			case DEFAULT:
				// 队列的相关参数需要与第一次定义该队列时相同，否则会出错，使用channel.queueDeclarePassive()可只被动绑定已有队列，而不创建
				channel.queueDeclare(queueName, rabbitConsumerConfig.isDurable(),
						rabbitConsumerConfig.isExclusive(), rabbitConsumerConfig.isAutoDelete(), null);
				break;
			case FANOUT:
				// 接收端也声明一个fanout交换机
				channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "fanout",
						rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
				// channel.exchangeDeclarePassive() 可以使用该函数使用一个已经建立的exchange
				// 声明一个临时队列，该队列会在使用完比后自动销毁
				queueName = channel.queueDeclare().getQueue();
				// 将队列绑定到交换机,参数3无意义此时
				channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), "");
				break;
			case DIRECT:
				channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "direct", rabbitConsumerConfig.isDurable(),
						rabbitConsumerConfig.isAutoDelete(), null);
				queueName = channel.queueDeclare().getQueue();
				for(String value : rabbitConsumerConfig.getQueueBindList()) {
					channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), value); // 绑定一个routing key，可以绑定多个
				}
				break;
			case TOPIC:
				channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "topic",
						rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
				queueName = channel.queueDeclare().getQueue();
				for(String value : rabbitConsumerConfig.getQueueBindList()) {
					channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), value); 
				}
				break;
			case HEADERS:
				channel.exchangeDeclare(rabbitConsumerConfig.getExchange(), "headers",
						rabbitConsumerConfig.isDurable(), rabbitConsumerConfig.isAutoDelete(), null);
				queueName = channel.queueDeclare().getQueue();

				channel.queueBind(queueName, rabbitConsumerConfig.getExchange(), "",
						rabbitConsumerConfig.getHeaderMap());
				break;
			}

			// 在同一时间不要给一个worker一个以上的消息。
			// 不要将一个新的消息分发给worker知道它处理完了并且返回了前一个消息的通知标志（acknowledged）
			// 替代的，消息将会分发给下一个不忙的worker。
			channel.basicQos(rabbitConsumerConfig.getPrefetchCount()); 

			// 用来缓存服务器推送过来的消息
			Consumer consumer = new DefaultConsumer(channel) {
	            @Override
	            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
	                    throws IOException {

				try {
					if(logger.isDebugEnabled()) {
						logger.debug("Received {} ", new String(body,"UTF-8"));
					}
					if (body == null || body.length <= 0) {
						return;
					}
					MQEvent<Object> message = new MQEvent<Object>();
					if (StringUtil.isNotEmpty(managerConsumerConfig.getParamType())) {
						try {
							if (!"java.lang.String".equals(managerConsumerConfig.getParamType())) {
								Class<?> cl = Class.forName(managerConsumerConfig.getParamType());
								message.setContent(
										SerializationFactory.factory(managerConsumerConfig.getTransfer())
												.deserializer(body, cl));
							} else {
								message.setContent(new String(body,"UTF-8"));
							}

						} catch (Exception e) {
							logger.error(e.getMessage() + new String(body,"UTF-8"), e);
						}
					} else {
						message.setContent(new String(body,"UTF-8"));
					}
					ConsumeResultStatus status = managerListener.notifyListener(managerConsumerConfig, message);
					if (ConsumeResultStatus.SUCCESS == status) {
						channel.basicAck(envelope.getDeliveryTag(), rabbitConsumerConfig.isAutoAck());
					}else {
						channel.basicNack(envelope.getDeliveryTag(), false, true);
					}
				} catch (Exception e) {
					logger.error(e.getMessage());
				}
	            }
	        };
			channel.basicConsume(queueName, rabbitConsumerConfig.isAutoAck(), consumer);
		}catch(Exception e) {
			logger.error(e.getMessage());
		}
	}

	

	private String getHost(RabbitConsumerConfig rabbitConsumerConfig) {
		final String address = rabbitConsumerConfig.getAddress();
		if (StringUtil.isNotEmpty(address) && address.indexOf(":") <= -1) {
			return address;
		}
		return address.substring(0, address.indexOf(":"));
	}


}
*/